Annotation of multiplexer/dispatch.c, revision 1.6
1.1 oskar 1: /*
2: * ISO 13818 stream multiplexer
3: * Copyright (C) 2001 Convergence Integrated Media GmbH Berlin
4: * Author: Oskar Schirmer (oskar@convergence.de)
5: */
6:
7: /*
8: * Module: Dispatch
9: * Purpose: Main dispatching loop.
10: *
11: * The first two buffer stages (raw file data and input stream data)
12: * are filled gready, the following two stages (from input stream to
13: * output data buffer, and further to stdout) are timing controlled.
14: */
15:
16: #include "global.h"
17: #include "error.h"
18: #include "splice.h"
19: #include "input.h"
20: #include "output.h"
21: #include "command.h"
22: #include "dispatch.h"
23:
24: boolean fatal_error;
25: boolean force_quit;
26: boolean busy_work;
27:
28: boolean dispatch_init (void)
29: {
30: fatal_error = FALSE;
31: force_quit = FALSE;
32: busy_work = FALSE;
33: return (TRUE);
34: }
35:
1.3 oskar 36: /* Dispatch work to the modules as needed.
37: * Mainly, check a few internal conditions (buffer space,
1.4 oskar 38: * data availability), check the corresponding files with
1.3 oskar 39: * poll for readiness, then do something at the appropriate
40: * points to push data forward.
41: * Data is pushed thru the multiplexer unidirectionally:
42: * input --> f.rawdata --> s.pesdata --> o.spliceddata --> output
43: * The main loop continues as long as there is something to do.
44: * Thereafter, the last two stages may generate a finish and
45: * another loop tries to pump the buffers out.
46: */
1.1 oskar 47: void dispatch (void)
48: {
49: boolean bi, bo, bs;
50: stream_descr *st;
1.2 oskar 51: t_msec tmo;
1.1 oskar 52: unsigned int nfds, onfds, infds;
1.5 oskar 53: int pollresult;
1.1 oskar 54: struct pollfd ufds [MAX_POLLFD];
55: warn (LDEB,"Dispatch",EDIS,0,0,0);
56: bs = FALSE;
57: st = input_available ();
58: nfds = 0;
59: command_expected (&nfds, &ufds[0]);
60: onfds = nfds;
61: bo = output_available (&nfds, &ufds[onfds], &tmo);
62: while ((bo
63: || bs
64: || (st != NULL)
65: || input_expected ()
66: || ((tmo >= 0) && (tmo <= MAX_MSEC_OUTDELAY))
67: || busy_work)
68: && (!fatal_error)
69: && (!force_quit)) {
70: infds = nfds;
71: bi = input_acceptable (&nfds, &ufds[infds], &tmo, output_acceptable ());
1.4 oskar 72: if ((bs)
73: || ((st != NULL) && output_acceptable ())) {
1.1 oskar 74: tmo = 0;
75: }
76: warn (LDEB,"Poll",EDIS,0,1,tmo);
77: #ifdef DEBUG_TIMEPOLL
78: ltp->tmo = tmo;
79: if (ltp->usec != 0) {
80: struct timeval tv;
81: gettimeofday (&tv,NULL);
82: ltp->usec -= tv.tv_usec;
83: }
84: ltp->nfdso = infds - onfds;
85: ltp->nfdsi = nfds - infds;
86: ltp->flags |=
87: (bo ? LTP_FLAG_OUTPUT : 0) |
88: (bi ? LTP_FLAG_INPUT : 0) |
89: (bs ? LTP_FLAG_SPLIT : 0) |
90: (st != NULL ? LTP_FLAG_PROCESS : 0);
91: ltp->sr = deb_inraw_free (0);
92: ltp->si = deb_instr_free (1);
93: ltp->so = output_free ();
94: ltp->nfdsrevent =
95: #endif
1.5 oskar 96: pollresult =
97: poll (&ufds[0], nfds, ((!timed_io) && (tmo > 0)) ? 0 : tmo);
98: if ((!timed_io) && (tmo > 0)) {
99: if (pollresult == 0) {
100: global_delta += tmo;
101: warn (LDEB,"Global Delta",EDIS,0,3,global_delta);
102: #ifdef DEBUG_TIMEPOLL
103: ltp->flags |= LTP_FLAG_DELTASHIFT;
104: #endif
105: }
106: }
1.1 oskar 107: #ifdef DEBUG_TIMEPOLL
108: if (logtpc < (max_timepoll-1)) {
109: struct timeval tv;
110: logtpc += 1;
111: ltp++;
112: gettimeofday (&tv,NULL);
113: ltp->usec = tv.tv_usec;
114: }
115: #endif
116: warn (LDEB,"Poll done",EDIS,0,2,nfds);
117: if ((0 < onfds)
118: && (ufds[0].revents & (POLLIN | POLLHUP | POLLERR))) {
119: command_process (ufds[0].revents & POLLIN);
120: }
121: if (bo
122: && (ufds[onfds].revents & (POLLOUT | POLLHUP | POLLERR))) {
123: output_something (ufds[onfds].revents & POLLOUT);
124: }
1.5 oskar 125: output_gen_statistics ();
1.1 oskar 126: if (bi) {
127: while (infds < nfds) {
128: if (ufds[infds].revents & (POLLIN | POLLHUP | POLLERR)) {
129: input_something (input_filehandle (ufds[infds].fd),
130: ufds[infds].revents & POLLIN);
131: bi = FALSE;
132: bs = TRUE;
133: }
134: infds += 1;
135: }
136: if (!bi) {
137: if (st == NULL) {
138: st = input_available ();
139: }
140: }
141: }
142: if (bs) {
143: bs = split_something ();
144: }
145: if ((st != NULL) && output_acceptable ()) {
146: st = process_something (st);
147: bs = TRUE;
148: }
149: if (st == NULL) {
150: st = input_available ();
151: }
152: nfds = 0;
153: command_expected (&nfds, &ufds[0]);
154: onfds = nfds;
155: bo = output_available (&nfds, &ufds[onfds], &tmo);
1.6 ! oskar 156: splice_all_configuration ();
1.1 oskar 157: }
158: process_finish ();
1.2 oskar 159: output_finish ();
1.1 oskar 160: while ((output_available (&nfds, &ufds[0], &tmo)
161: || (tmo >= 0))
162: && (!fatal_error)) {
163: output_something (TRUE);
164: }
165: #ifdef DEBUG_TIMEPOLL
166: {
167: int i, u, s;
168: i = 0;
169: s = 0;
1.6 ! oskar 170: fprintf (stderr, "lines: %8d\n", (int)logtpc);
1.1 oskar 171: while (i < logtpc) {
172: u = (logtp[i].usec > 0 ? 1000000 : 0) - logtp[i].usec;
173: s += u;
174: fprintf (stderr,
175: "%08d %10d.%06d:%8d (%6d) %c%5d %d %d/%d %c%c%c%c %d (F:%6d,S:%6d,O:%6d)\n",
176: i,
177: (int)logtp[i].tv.tv_sec,
178: (int)logtp[i].tv.tv_usec,
179: logtp[i].msec_now,
180: u,
181: logtp[i].flags & LTP_FLAG_DELTASHIFT ? 'D' : ' ',
182: logtp[i].tmo,
183: logtp[i].cnt_msecnow,
184: logtp[i].nfdsi,
185: logtp[i].nfdso,
186: logtp[i].flags & LTP_FLAG_INPUT ? 'I' : ' ',
187: logtp[i].flags & LTP_FLAG_SPLIT ? 'S' : ' ',
188: logtp[i].flags & LTP_FLAG_PROCESS ? 'P' : ' ',
189: logtp[i].flags & LTP_FLAG_OUTPUT ? 'O' : ' ',
190: logtp[i].nfdsrevent,
191: logtp[i].sr,
192: logtp[i].si,
193: logtp[i].so
194: );
195: i += 1;
196: }
1.6 ! oskar 197: fprintf (stderr, "%43d\n", s);
1.1 oskar 198: }
199: #endif
200: }
201:
LinuxTV legacy CVS <linuxtv.org/cvs>